Hologres与Flink全托管高度兼容,多数情况下您可以使用Flink SQL的方式,声明Hologres的源表、维表及结果表,进而使用SQL表达数据的处理逻辑。但对于特殊业务场景,Flink SQL方式无法满足业务计算时,您需要使用DataStream的方式读写数据。本文以VVR-8.0.8-Flink-1.17版本为例,为您完整地展示如何调试和开发基于Hologres连接器的DataStream作业。
前提条件
已购买Hologres实例并创建数据库。详情请参见创建数据库。
已安装代码开发平台,用于本地代码调试,如IntelliJ IDEA。
步骤一:下载Connector依赖
通过DataStream的方式读写Hologres数据时,您需要下载Hologres连接器连接Flink全托管。目前已发布的连接器版本请参见Hologres DataStream连接器。
您需要下载如下2个依赖JAR包:
ververica-connector-hologres-1.17-vvr-8.0.8.jar:用于本地调试。
ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar:用于本地调试和线上部署。
说明从VVR-6.0-Flink-1.15版本起,商业版Connector在本地调试时,需要配合相应版本的Uber JAR使用。使用方法请参见本地运行和调试包含连接器的作业。
下载后,使用如下命令将ververica-connector-hologres-1.17-vvr-8.0.8.jar安装至本地Maven仓库中:
mvn install:install-file -Dfile=$path/ververica-connector-hologres-1.17-vvr-8.0.8.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.17-vvr-8.0.8 -Dpackaging=jar
其中
$path
为您本地存放ververica-connector-hologres-1.17-vvr-8.0.8.jar的绝对路径。
步骤二:本地开发及调试
您需要在本地完成项目开发,再在Flink全托管控制台上部署并运行。以Binlog源表为例,项目代码及pom.xml文件如下:
本地代码编写:
DataStream API Demo代码:
import com.alibaba.ververica.connectors.hologres.binlog.HologresBinlogConfigs; import com.alibaba.ververica.connectors.hologres.binlog.StartupMode; import com.alibaba.ververica.connectors.hologres.binlog.source.HologresBinlogSource; import com.alibaba.ververica.connectors.hologres.config.HologresConnectionParam; import com.alibaba.ververica.connectors.hologres.config.JDBCOptions; import com.alibaba.ververica.connectors.hologres.utils.JDBCUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import java.util.Collections; public class HologresBinlogSourceDemo { public static void main(String[] args) throws Exception { Configuration envConf = new Configuration(); // 本地调试时,需要指定uber jar的绝对路径;打包上传时请注释掉 envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>"); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(envConf); // 初始化读取的表的Schema,需要和Hologres表的字段匹配,可以只定义部分字段。 TableSchema schema = TableSchema.builder() .field("<id>", DataTypes.INT().notNull()) .primaryKey("<id>") .build(); // Hologres的相关参数。 Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "<yourEndpoint>"); config.setString(HologresConfigs.USERNAME, "<yourUserName>"); config.setString(HologresConfigs.PASSWORD, "<yourPassword>"); config.setString(HologresConfigs.DATABASE, "<yourDatabaseName>"); config.setString(HologresConfigs.TABLE, "<yourTableName>"); config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true); config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true); // 构建JDBC Options。 JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config); // 构建Hologres Binlog Source。 long startTimeMs = 0; HologresBinlogSource source = new HologresBinlogSource( new HologresConnectionParam(config), schema, config, jdbcOptions, startTimeMs, StartupMode.INITIAL, "", "", -1, Collections.emptySet()); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print(); env.execute(); } }
参数说明:
参数
描述
path_to_uber_jar
本地Uber JAR的绝对路径。对于Windows需要加相应磁盘分区,例如
file:///D:/path/to/a-uber.jar
。id
始化读取的表的Schema,需要和Hologres表的字段匹配,可以只定义部分字段。
yourEndpoint
Hologres实例的网络域名。您可以进入Hologres管理控制台的实例详情页,从网络信息中获取域名。
yourUserName
阿里云账号的AccessKey ID。您可以单击AccessKey 管理,获取AccessKey ID。
yourPassword
对应阿里云账号的AccessKey Secret。
yourDatabaseName
Hologres数据库名称。
yourTableName
待读取的Hologres表名称。
pom.xml文件:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-flink-demo</artifactId> <version>1.0-SNAPSHOT</version> <packaging>jar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.17.2</flink.version> <vvr.version>1.17-vvr-8.0.8</vvr.version> <target.java.version>1.8</target.java.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>${target.java.version}</maven.compiler.source> <maven.compiler.target>${target.java.version}</maven.compiler.target> <log4j.version>1.7.21</log4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-hologres</artifactId> <version>${vvr.version}</version> </dependency> <!-- 日志实现 log4j 依赖 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${log4j.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>jar-with-dependencies</shadedClassifierName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
本地调试及运行。
您需要配置运行所需要的ClassLoader JAR包,即ververica-classloader-1.15-vvr-6.0-SNAPSHOT.jar。具体操作请参见步骤二:配置运行所需要的ClassLoader JAR包。
(可选)如果提示缺少一些常见的Flink类无法执行,例如
org.apache.flink.configuration.Configuration
,需要在“Modify options”处勾选“Add dependencies with provided scope to classpath”。
配置完成后,您可在本地调试运行该项目,确保本地可运行成功。
本地调试步骤详情请参见本地运行和调试包含连接器的作业。
步骤三:打包运行
本地调试成功后,您可将其进行打包并与Uber JAR一同上传至Flink。
打包前,注释掉下述代码:
envConf.setString("pipeline.classpaths", "file://" + "<path_to_uber_jar>");
编译打包。
使用Maven编译并打包应用程序及其依赖项。命令如下:
mvn clean package -DskipTests
打包成功后,即可在本地生成名为hologres-flink-demo-1.0-SNAPSHOT-jar-with-dependencies.jar的文件。
上传JAR包。
在Flink控制台的资源管理页面上传打包好的程序JAR包和ververica-connector-hologres-1.17-vvr-8.0.8-uber.jar。具体操作请参见步骤二:上传测试JAR包和数据文件。
部署JAR作业。
在Flink控制台的作业运维页面部署JAR作业。具体操作及参数信息请参见步骤三:部署JAR作业。
启动并查看Flink计算结果。
说明若更新JAR包,需要重新上传部署JAR包并启动作业。
在Flink控制台的作业运维页面,单击目标作业名称操作列中的启动。
配置资源信息和基础设置。
作业启动参数配置详情请参见作业启动。
单击启动。
单击启动后,作业状态变为运行中,则代表作业运行正常。
常见问题
问题1:当您在IntelliJ IDEA中运行和调试Flink作业时,如果其包含了阿里云实时计算Flink版的商业版连接器依赖,可能会遇到无法找到连接器相关类的运行错误。例如
Caused by: java.lang.ClassNotFoundException: com.alibaba.ververica.connectors.hologres.binlog.source.reader.HologresBinlogRecordEmitter
。问题原因:此类异常一般是由于本地调试没有正确使用uber jar导致的
解决方法:请参考本文或者本地运行和调试包含连接器的作业正确使用Uber JAR进行调试。
问题2:提示缺少一些常见的Flink类无法执行,例如
Caused by: java.lang.ClassNotFoundException: org.apache.flink.configuration.Configuration
。问题原因:可能是缺少依赖或者没有正常加载依赖。
解决方法:
pom.xml文件中没有引入相关依赖,大多数情况下可能是flink-connector-base,也可以搜索异常包路径,查看其属于哪个Flink依赖。
可能是运行时没有加载provided依赖。需要在IntelliJ IDEA的“Modify options”处勾选“Add dependencies with provided scope to classpath”。
问题3:运行中报错
Incompatible magic value
。问题原因:
原因一:可能是使用的Uber JAR与Connector版本不一致。
原因二:可能是ClassLoader设置有误。
解决方法:
对于原因一:可参考本文选择对应版本的Connector和Uber JAR。
对于原因二:请参考配置运行所需要的ClassLoader JAR包重新设置。
问题4:运行时抛出异常
Unable to load flink-decryption library java.io.FileNotFoundException: Decryption library native/windows/x86/flink-decryption.dll not found
。问题原因:目前Uber JAR不支持Windows系统32位的Java。
解决方法:请安装64位的Java,可以通过
java -version
命令查看Java安装信息,如果不包含64-Bit
字样,表明是32位的Java。
问题5:运行时抛出
Caused by: java.lang.ClassFormatError
。问题原因:可能是由于IntelliJ IDEA配置的JDK版本问题导致。
解决方法:请使用较新的JDK8或者JDK11版本。